ClassCam+

by Alex Coy (ac2456) and Jason Russo (jr826). A product for ECE 5725's final project.

ClassCam+

Alex Coy (ac2456) and Jason Russo (jr826)

18 May 2020

Project Objective

This project aims to point a camera at a lecturer while also maintaining the ability to control the camera manually, record video to a local destination, or stream over a network.

Team Photo
Alex Coy (left) and Jason Russo (right)

Introduction

Starting on roughly April 1st, 2020, the team drew plans for a simple pan/tilt camera platform in OpenSCAD. Coy printed the parts on a pair of inexpensive 3D printers and assembled them, drawing more as necessary. The team developed software to control the pan/tilt platform, and then added custom network control software. The team was able to start to explore person recognition, but could not learn enough in time.



Check out our demonstration video here!

Pan-Tilt-Zoom Camera

In this project, the team constructed their own Pan-Tilt-Zoom (PTZ) camera. PTZ cameras are noted for their ability to be controlled remotely. They can 'pan' left or right, 'tilt' up or down, and 'zoom' in or out. These three functionalities allow for PTZ cameras to cover a wide area. This is key for the ClassCam+'s main application of tracking an individual as they move about a room.

OpenCV

This project also utilizes OpenCV, the "Open Source Computer Vision Library". This open source computer vision and machine learning library provides facial detection and recognition, object identification, and person detection. The ClassCam+ aims to utilizes OpenCV's person detection capabilities to track and individual as they move about the room.


Design

Design Overview

The following flowchart depicts the flow of information throughout the system. The client sends camera control commands to the Raspberry Pi. The RPi interprets these commands and moves them to the platform, which performs the movement. This, in turn, moves the camera. All the while, the Raspberry Pi is receiving the camera feed and streaming it out to all of the clients.

flowchart
Simple flowchart of information flow in the system.

Physical Platform

Every piece of the custom pan/tilt platform was drawn in OpenSCAD with the idea that the parts should be mostly 3D-printable, as that was the primary constraint on making custom parts. As a result, the team used an assembly file to keep track of the key structural parts.

Pan/Tilt Platform
Fig. 1: The core pan/tilt platform assembly used to check fit
Pan/Tilt Platform
Fig. 2: The final pan/tilt platform assembly in the home lab

See the source repository for the openSCAD files used plus needed openSCAD libraries. We chose stepper motors because they were already available in the home lab, and because they are better suited to this design due to their robustness at a low cost. We chose to control the stepper motors using an Arduino over controlling them with the Pi because it is very hard to make the Pi perform any sort of true realtime operation. Using the Arduino as a motor coprocessor allows the Pi to apply minimal computing power towards controlling the motors. Lastly, we chose infrared optical limit switches consisting of an IR LED and a reverse-biased IR photodiode because the home lab did not have any traditional limit switches.

Client

The client connects to the server by running client.py. The client code creates a thread to connect to the server and receive the camera feed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class myVC():
                            def __init__(self):
        context = zmq.Context()
                            self.socket = context.socket(zmq.SUB)
                            self.socket.connect('tcp://alexcoy.duckdns.org:5002')
                            self.socket.setsockopt_string(zmq.SUBSCRIBE, np.unicode(''))

                            def getFrame(self):
        image_string = self.socket.recv_string()
        raw_image = base64.b64decode(image_string)
        image = np.frombuffer(raw_image, dtype=np.uint8)
        frame = cv2.imdecode(image, 1)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                            return frame

An application class is used to present a GUI to the user that contain the camera feed.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class SerialApp(Frame):
                            def __init__(self, master=None):
                            super().__init__(master)
                            self.master = master
                            self.pack()
                            self.initWidgets()
                            self.errorMsg = False

                            self.vid = myVC()
                            self.canvas = Canvas(self, width=640, height=480)

                            self.canvas.pack()
                            self.outgoing = makeCommandBytes(Commands.CMD_GETCOORD, 25, 30)
                            self.after(1, self.paintCanvas)
This leads to the users seeing the following.
client
Client GUI before connecting to enable camera control.
Users can then connect to the the camera server and control the camera remotely. Upon connecting, the GUI will update to display various control buttons, as shown below.
client_gui
Client GUI with all camera controls.
Interacting with these buttons will send a command to the server which will then control the camera.

Person Detection

We created a custom Detection class as shown below that utilizes OpenCV to receive frames and process them using OpenCV's HOG (Histogram of Oriented Gradients) module and the defaultPeopleDetector.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class Detection(threading.Thread):
                            def __init__(self):
                            # init HOG descriptor/person detection. HOG is Histogram of
                            # Oriented Gradients. It is used for image detection. In our case
                            # It will be used for people detection.
                    threading.Thread.__init__(self)
                            self.hog = cv2.HOGDescriptor()
                            self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
                            self.frame = False
                            self.newFrame = False
                            self.boxes = np.array([])

The Detection class runs the following detect function on a given frame to search for people within the frame. The frame is converted to grayscale in order to speed up the image processing. Then a HOG detection function is called and returns bounding boxes for the detected people. The boxes are formatted as numpy arrays and are returned to be displayed on screen.

1
2
3
4
5
6
7
8
9
def detect(self, frame):
                            # use greyscale for faster detection
                    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
                            # detect people in the image
                            # returns the bounding boxes for the detected objects
                    boxes, weights = self.hog.detectMultiScale(gray, winStride=(8, 8))
                            # print(boxes)
                    boxes = np.array([[x, y, x+w, y+y] for (x, y, w, h) in boxes])
                            return boxes

The AutoUpdate function is used when the camera is in auto mode in order to move the camera and follow along with the person on screen. This function evaluates the boxes stored in self.boxes (updated using the detect function). This function computes the midpoint of the bounding boxes on screen and compares it to the center of the camera. If the bounding boxes are beyond a defined threshold in any direction, a command is sent to the camera to move on step in that direction. The threshold exists to keep the system from oscillating about that point. We also choose to move in steps instead of simply inputting the midpoint and moving there because this could lead to the camera jumping wildly about the screen and moving too fast.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def AutoUpdate(self, thePTZ):
                            # average values
                    avg_x = []
                    avg_y = []
                    new_x = 0
                    new_y = 0
                            if (self.boxes.size == 0):
                            return
                            # loop through boxes
                            for i in self.boxes:
                            # average x coordinates
                    avg_x.append((i[0] + i[2]) / 2)
                    avg_y.append((i[1] + i[3]) / 2)
                            for x in avg_x:
                    new_x = new_x + x
                            if (len(avg_x) > 0):
                    new_x = new_x / len(avg_x)
                            for y in avg_y:
                    new_y = new_y + y
                            if (len(avg_y) > 0):
                    new_x = new_y / len(avg_y)
                    framex = self.frame.shape[1]
                    framey = self.frame.shape[0]
                            #print(f"Frame is {framex:5d} x {framey:5d}")
                            # compute difference between camera center and bounding box center
                    xdif = framex/2 - new_x
                    ydif = framey/2 - new_y

Another option that was explored for person detection was utilizing face detection instead of full body detection. This idea was inspired by the Automated Lamp project from this very course in Fall 2019. Unfortunately, the team did not have enough time to fully explore this option or to fully refine the person detection to provide more conistent results.


Testing

A significant amount of system testing was able to be done by the team connecting to the system remotely. Coy was able to run the server at his house while Russo would connect from his house. Russo could then send commands to the camera and both teammates could observe the response from the system. In the case of people detection, Coy was able to position himself in front of the camera and the team could observe as the system attempted to track Coy's movement. A camera system such as this one is hard to write up a test suite for, and so most functionality testing had to be done by executing the functions and observing the response.

Late Night Testing Evidence
Fig. 3: Testing the default HOG people detection function

Dead-end: Pure Web Interface

In our code repository, there are remnants of a web application. The initial idea for this project was for it to be something that any device with a web browser could use. As such, a simple php application, runnable on the Pi, exists. The main snag that we ran into was for creating a live stream. It turns out, trying to encode browser-streamable video on-the-fly is hard. There is no tool available that can take the hardware-compressed h.264 stream from the Pi's VideoCore IV GPU, remux it into a fragmented format, and segment the fragments in one pass. We experimented with a two-pass solution, but ultimately scrapped it because of astronomical latency.


Results

The team was able to achieve most of the original project goals. By the end, the team had constructed a camera streaming system that could be streamed to multiple viewers at the same time and controlled remotely by one viewer. The controller could manually control the camera as well as enter an 'auto' mode that would track a person as they moved about the room. The shortcomings of this project fell in the person detection. The system does not reliable detect a person on the screen, leading to the camera sometimes not following the individiual as they move about, or even sometimes detecting an inanimate object as a person and moving to that instead. The camera feed is reliable, however, and is able to viewed remotely with little latency. The camera can also respond to remote commands with little latency as well. The team was able to construct an efficent and reliable network setup for the ClassCam+, a pivotal part of the design of the project.


Conclusion

Overall, the project was a success. The team was able to continue working despite the setbacks caused by COVID-19 the move off campus. The team was able to explore different options for the project, even if they all didn't fully pan out (such as the browser view and facial detection as opposed to full body detection). The team faced some additional setbacks with OpenCV setup, but was able to move past this as well. In the end, the team was running the OpenCV defaultPeopleDetector for recognizing a person. Although it did not always produce reliable results, the team made good progress and sees this an area for future improvement.


Future Work

The team sees multiple areas of future improvement for the project. Perhaps most importantly, the team would work to make the person detection more reliable, or even switch to another method of detection (such as facial detection, as mentioned earlier). This would better help the ClassCam+ reach it's desired state of a fully automated lecture streaming system.

Another area of improvement is the saving of lecture videos. As of now, video is only streamed and is then lost. With more time, the team would explore means of saving the video, giving students and professors recordings of the lectures for future reference. This was an idea that was orginally considered for the project, but was not able to be fully explored due to time constraints.

Lastly, the team would consider different ways of making the stream more accessible. This was the original intent of the browser streaming, but this feature was unable to be implemented given the design of the system. To get this feature working, the team may have to make significant changes to the system structure. With enough time, however, this may be a worthwhile endeavor.


Budget

Item Cost Item Cost
28BYJ-48 Stepper Motor/Driver Board 2 of 5 $4.80 Arducam OV5647 Camera with CS Mount Lens $30
0.75kg Black 3D Printer Filament at most $15 Elegoo Uno R3 $13
IR LEDs and Photodiodes $0.50 Jumper and normal wire $1
M3 Screws and Standoffs $1 Total Cost $65.30

Work Distribution

Russo focused primarily on OpenCV. He worked to set up the person detecting and the camera response to track the person as they move about the room. He also wrote the video tracking script and contributed to the Design, Results, Conclusion, and Future Work sections of the website.

Coy focused primarily on the network, responsiveness, and physical portions of the project. He wrote and/or reviewed the ``guts'' of anything involving networking or physical hardware, such as the video networking code (which was taken from an unnoted example of a pub/sub model for OpenCV frame sharing; sorry!) and the PTZ abstraction (completely his own, built from reading library documentation).


References

Automated Lamp Fall 2019
Real-time Human Detection with OpenCV
PTZ Definition
OpenCV about
Python 3 documentation for PyZmq, OpenCV, PySerial, Socket, and all other libraries included with an import statement.
Elegoo stepper driver code example on product listing Example code
OpenCV to Tkinter window example

Code

This project features a rather large code base. The entire code base can be found on this Github Repo: PTZ Project Repo Below are a few of the key files used in this project.

client.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#! /usr/bin/env python3
from tkinter import *
from tkinter.ttk import *
from ptz_controller import Commands, makeCommandBytes
import socket
import cv2
import zmq
import base64
import numpy as np
import PIL.Image
import PIL.ImageTk

jDist = 768


class myVC():
                        def __init__(self):
        context = zmq.Context()
                        self.socket = context.socket(zmq.SUB)
                        self.socket.connect('tcp://alexcoy.duckdns.org:5002')
                        self.socket.setsockopt_string(zmq.SUBSCRIBE, np.unicode(''))

                        def getFrame(self):
        image_string = self.socket.recv_string()
        raw_image = base64.b64decode(image_string)
        image = np.frombuffer(raw_image, dtype=np.uint8)
        frame = cv2.imdecode(image, 1)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                        return frame


class SerialApp(Frame):
                        def __init__(self, master=None):
                        super().__init__(master)
                        self.master = master
                        self.pack()
                        self.initWidgets()
                        self.errorMsg = False

                        self.vid = myVC()
                        self.canvas = Canvas(self, width=640, height=480)

                        self.canvas.pack()
                        self.outgoing = makeCommandBytes(Commands.CMD_GETCOORD, 25, 30)
                        self.after(15, self.paintCanvas)

                        def paintCanvas(self):
        frame = self.vid.getFrame()
                        self.photo = PIL.ImageTk.PhotoImage(image=PIL.Image.fromarray(frame))
                        self.canvas.create_image(0, 0, image=self.photo, anchor=NW)
                        self.after(15, self.paintCanvas)

                        def initWidgets(self):
                        self.bottomFrame = Frame(self)
                        self.bottomFrame.pack(side="bottom")

                        self.startButton = Button(self.bottomFrame)
                        self.startButton["text"] = "Connect"
                        self.startButton["command"] = self.try_connect

                        self.aChoice = Entry(self.bottomFrame)
                        self.aChoice["width"] = 20
                        self.aLabel = Label(self.bottomFrame)
                        self.aLabel["text"] = "Connect to: "
                        self.startButton.pack(side="left")
                        self.aLabel.pack(side="left")
                        self.aChoice.pack(side="left")

                        def homeOrJog(self):
                        self.startButton["text"] = "Home"
                        self.startButton["command"] = (self.doHome)

                        self.midFrame = Frame(self)
                        self.xyGroup = Frame(self.midFrame)
                        self.xGroup = Frame(self.xyGroup)
                        self.yGroup = Frame(self.xyGroup)

                        self.midFrame.pack(side="top")
                        self.goButton = Button(self.midFrame)
                        self.goButton["text"] = "Go"
                        self.goButton["width"] = 2
                        self.goButton["command"] = self.doSet

                        self.panCCWButton = Button(self.midFrame)
                        self.panCCWButton["text"] = "L"
                        self.panCCWButton["width"] = 2
                        self.panCCWButton["command"] = self.panCCW

                        self.panCWButton = Button(self.midFrame)
                        self.panCWButton["text"] = "R"
                        self.panCWButton["width"] = 2
                        self.panCWButton["command"] = self.panCW

                        self.tiltCCWButton = Button(self.midFrame)
                        self.tiltCCWButton["text"] = "D"
                        self.tiltCCWButton["width"] = 2
                        self.tiltCCWButton["command"] = self.tiltCCW

                        self.tiltCWButton = Button(self.midFrame)
                        self.tiltCWButton["text"] = "U"
                        self.tiltCWButton["width"] = 2
                        self.tiltCWButton["command"] = self.tiltCW

                        self.autoButton = Button(self.midFrame)
                        self.autoButton["text"] = "auto"
                        self.autoButton["width"] = 6
                        self.autoButton["command"] = self.auto

                        self.xEntry = Entry(self.xGroup)
                        self.xEntry["width"] = 5
                        self.xLabel = Label(self.xGroup)
                        self.xLabel["text"] = "Pan: "
                        self.xEntry.pack(side="right")
                        self.xEntry.pack(side="left")

                        self.yEntry = Entry(self.yGroup)
                        self.yEntry["width"] = 5
                        self.yLabel = Label(self.yGroup)
                        self.yLabel["text"] = "Tilt: "
                        self.yEntry.pack(side="right")
                        self.yEntry.pack(side="left")

                        self.xGroup.pack(side="left")
                        self.yGroup.pack(side="right")

                        self.xyGroup.pack(side="left")
                        self.autoButton.pack(side="right")
                        self.goButton.pack(side="right")
                        self.panCWButton.pack(side="right")
                        self.panCCWButton.pack(side="right")
                        self.tiltCWButton.pack(side="right")
                        self.tiltCCWButton.pack(side="right")

                        self.midFrame.pack()
                        self.bottomFrame.pack()

                        def try_connect(self):
                        print("Trying to connect to " + self.aChoice.get())
                        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket.setdefaulttimeout(0.25)
                        try:
                        self.sock.connect((self.aChoice.get(), 5000))
                        self.after(250, self.heartbeat)
                        self.actualMessage = False
                        self.processingSet = False
                        self.homeOrJog()
                        except:
                        if (self.errorMsg):
                        self.errorMsg.pack_forget()
                        self.errorMsg = Label(self)
                        self.errorMsg["text"] = "Couldn't connect"
                        self.errorMsg.pack()

                        def heartbeat(self):
                        if (self.actualMessage):
                        self.actualMessage = False
                        #self.outgoing = makeCommandBytes(Commands.CMD_GETCOORD, 25, 30)
                        else:
                        self.sock.send(self.outgoing)
        m_in = self.sock.recv(256)
                        # print("got ", m_in)

                        if (len(m_in) > 5 and m_in[0] == Commands.CMD_GETCOORD.value):
            x = (m_in[3] << 8) + m_in[4]
            y = (m_in[5] << 8) + m_in[2]
                        self.xEntry.delete(0, END)
                        self.xEntry.insert(0, str(x))
                        self.yEntry.delete(0, END)
                        self.yEntry.insert(0, str(y))
                        # print(str(y))
                        self.outgoing = makeCommandBytes(Commands.CMD_OK, 25, 30)

                        self.after(250, self.heartbeat)

                        def replaceX(self, x):
                        self.xEntry.delete(0, END)
                        self.xEntry.insert(0, str(x))

                        def replaceY(self, y):
                        self.yEntry.delete(0, END)
                        self.yEntry.insert(0, str(y))

                        def doSet(self):
                        self.sock.send(makeCommandBytes(
            Commands.CMD_SETCOORD,
                        int(self.xEntry.get()),
                        int(self.yEntry.get())))
                        self.actualMessage = True

                        def tiltCCW(self):
                        self.sock.send(makeCommandBytes(
            Commands.CMD_SETCOORD,
                        int(self.xEntry.get()),
                        int(self.yEntry.get()) - jDist))
                        self.replaceY(int(self.yEntry.get()) - jDist)
                        self.actualMessage = True

                        def tiltCW(self):
                        self.sock.send(makeCommandBytes(
            Commands.CMD_SETCOORD,
                        int(self.xEntry.get()),
                        int(self.yEntry.get()) + jDist))
                        self.replaceY(int(self.yEntry.get()) + jDist)
                        self.actualMessage = True

                        def panCCW(self):
                        self.sock.send(makeCommandBytes(
            Commands.CMD_SETCOORD,
                        int(self.xEntry.get()) - jDist,
                        int(self.yEntry.get())))
                        self.replaceX(int(self.xEntry.get()) - jDist)
                        self.actualMessage = True

                        def panCW(self):
                        self.sock.send(makeCommandBytes(
            Commands.CMD_SETCOORD,
                        int(self.xEntry.get()) + jDist,
                        int(self.yEntry.get())))
                        self.replaceX(int(self.xEntry.get()) + jDist)
                        self.actualMessage = True

                        def auto(self):
                        self.sock.send(makeCommandBytes(Commands.CMD_SETAUTO, 0, 0))

                        def doHome(self):
                        self.sock.send(makeCommandBytes(Commands.CMD_HOME, 0, 0))
                        self.replaceX(5120)
                        self.replaceY(5120)
                        self.actualMessage = True


def main():
    top = Tk()
    app = SerialApp(master=top)
    top.mainloop()
    quit()


if (__name__ == "__main__"):
    main()

otherServer.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python3

import socket
import threading          # Import socket module
from ptz_controller import PTZ, Commands, makeCommandBytes
import time
import base64
import cv2
import zmq
from video import Detection


class VCThread(threading.Thread):
                        def __init__(self):
        threading.Thread.__init__(self)
        context = zmq.Context()
                        self.imgsocket = context.socket(zmq.PUB)
                        self.imgsocket.bind('tcp://*:5002')

                        self.detector = Detection()
                        self.detector.start()

                        self.camera = cv2.VideoCapture(0)
                        self.go = True

                        class VCException(Exception):
                        pass

                        def run(self):
                        while(self.go):

            ret, frame = self.camera.read()
                        self.detector.setFrame(frame)
            boxes = self.detector.getCurrentBoxes()
                        if (thePTZ.Auto == True):
                        self.detector.AutoUpdate(thePTZ)
                        if (boxes.size > 0):
                cv2.rectangle(
                    frame, (boxes[0][0], boxes[0, 1]), (boxes[0][2], boxes[0][3]), (255, 0, 0), 10)
            frame = cv2.resize(frame, (640, 480))
            encoded, buf = cv2.imencode('.jpg', frame)
# cv2.imshow("frame", frame)
# cv2.waitKey(1)
            image = base64.b64encode(buf)
                        self.imgsocket.send(image)

                        self.camera.release()
        cv2.destroyAllWindows()

                        def stopThread(self):
                        self.go = False


class ClientThread(threading.Thread):
                        def __init__(self, clientsocket,  clientAddress, isPrimary):
        threading.Thread.__init__(self)
                        self.csocket = clientsocket
                        self.caddr = clientAddress
                        print("New connection added: ", clientAddress)
                        self.runAuto = True

                        def run(self):
                        global primary_c
                        while True:
                        try:
                        self.csocket.send(bytes([Commands.CMD_OK.value]))
                time.sleep(0.25)
                data = self.csocket.recv(2048)
                        except socket.timeout:
                        self.csocket.close()
                        print("Timeout")
                primary_c = False
                        break
                        except BrokenPipeError:
                        print("Broken Pipe")
                primary_c = False
                        break
                        except ConnectionResetError:
                        print("Connection Reset")
                primary_c = False
                        break

            result = self.parseCommand(data)
                        if (result == 0):
                        break
                        if (result == Commands.CMD_NOPE.value):
                        print("nope")
                        print("Client at ", self.caddr, " disconnected...")

                        def parseCommand(self, m_in):
                        global thePTZ
        error = False
        end = len(m_in)
                        if (end < 1):
            error = True
                        elif (m_in[0] == Commands.CMD_OK.value):
                        pass
                        # print("Got ok")
                        elif (m_in[0] == Commands.CMD_HOME.value):
                        print("Home")
            thePTZ.command(Commands.CMD_HOME)
                        elif (m_in[0] == Commands.CMD_SETAUTO.value):
            thePTZ.command(Commands.CMD_SETAUTO)
                        elif (m_in[0] == Commands.CMD_SETCOORD.value):
                        if (end < 6):
                error = True
                        else:
                x = (m_in[3] << 8) + m_in[2]
                y = (m_in[5] << 8) + m_in[4]
                        print("Move to "+str(x)+", "+str(y))
                thePTZ.command(Commands.CMD_SETCOORD, x, y)

                        elif (m_in[0] == Commands.CMD_GETCOORD.value):
            toSend = makeCommandBytes(
                Commands.CMD_GETCOORD, thePTZ.pan, thePTZ.tilt)
                        print(f'Getcoord with pan = {thePTZ.pan: d}. ')
                        try:
                        self.csocket.send(toSend)
                        except socket.timeout:
                        self.csocket.close()
                        print("Timeout")
                primary_c = False
                        return 0
                        except BrokenPipeError:
                        print("Broken Pipe")
                primary_c = False
                        return 0
                        except ConnectionResetError:
                        print("Connection Reset")
                primary_c = False
                        return 0
                        return Commands.CMD_NOPE.value if error else Commands.CMD_DONE.value


s = socket.socket()
s.bind(('', 5000))
s.listen(5)
primary_c = False
thePTZ = PTZ("/dev/ttyACM0")
videoThread = VCThread()
videoThread.start()

c = ''
try:
                        while True:
        c, addr = s.accept()     # Establish connection with client.
        c.settimeout(2.0)
                        if (not primary_c):
            newthread = ClientThread(c, addr, True)
            primary_c = True
            newthread.start()

                        else:
                        print('Got connection from', addr)
            c.send(b'Busy')
            c.close()                # Close the connection


except KeyboardInterrupt:
                        if (c != ''):
        c.close()
    s.close()
    videoThread.stopThread()

ptz_controller.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
import serial
from enum import Enum
from time import sleep


def makeCommandBytes(cmd, x=0, y=0):
                        if cmd == Commands.CMD_SETCOORD or cmd == Commands.CMD_GETCOORD:
        toExec = bytes([cmd.value, 0, x & 0xff, (x & 0xff00) >> 8,
                        y & 0xff, (y & 0xff00) >> 8,
                        Commands.CMD_END.value])
                        #print("Sending a setcoord")

                        else:
        toExec = bytes([cmd.value, Commands.CMD_END.value])
                        return toExec


class PTZ:
                        def __init__(self, port):
                        self.ser = serial.Serial(
            port=port,
            baudrate=115200
        )
                        self.pan = 5120
                        self.tilt = 5120
        sleep(3)
        cmd = bytes([Commands.CMD_GIVE_HWINFO.value, Commands.CMD_END.value])
                        self.Auto = True
                        self.ser.reset_input_buffer()
                        #print(list(map(lambda x:hex(x), cmd)))
                        self.ser.write(cmd)
        rec = self.ser.read(2)
                        #print(hex(rec[0]), hex(rec[1]))
                        if(rec == cmd):
                        # sleep(3)
                        self.ser.write(bytes([Commands.CMD_OK.value]))

            info = self.ser.read(3)
                        print(f"Version {info[0]:d} with {info[1]:d} axes")
                        if(info[2] == Commands.CMD_DONE.value):
                        self.status = "OK"
                        else:
                        self.status = "FAIL"

                        # self.home()
                        if (False):
                        for _ in range(0, 20):
                        self.command(Commands.CMD_JOG_PAN_CCW)
                        self.command(Commands.CMD_JOG_TILT_CCW)
                        self.command(Commands.CMD_JOG_PAN_CW)
                        self.command(Commands.CMD_JOG_TILT_CW)
                        self.command(Commands.CMD_JOG_PAN_CW)
                        self.command(Commands.CMD_JOG_TILT_CW)
                        self.command(Commands.CMD_JOG_PAN_CCW)
                        self.command(Commands.CMD_JOG_TILT_CCW)

                        def command(self, cmd, x=0, y=0):
        x = x if x <= 10240 and x >= 0 else 10240 if x > 10240 else 0
        y = y if y <= 10240 and y >= 0 else 10240 if y > 10240 else 0
                        if (cmd == Commands.CMD_SETAUTO):
                        self.Auto = not self.Auto
                        return
                        if (cmd == Commands.CMD_SETCOORD):
                        self.pan = x
                        self.tilt = y
        toExec = makeCommandBytes(cmd, x, y)
                        self.ser.write(toExec)
        rec = self.ser.read(len(toExec))
                        #print("Wrote " + str(toExec))
                        #print("Got " + str(rec))
                        while (rec != toExec):
                        self.ser.reset_input_buffer()
            sleep(1)
                        self.ser.write(toExec)
            rec = self.ser.read(2)
                        #print("Wrote " + str(toExec))
                        #print("Got " + str(rec))
                        self.ser.write(bytes([Commands.CMD_OK.value]))
        rec = self.ser.read(1)
                        assert rec[0] == Commands.CMD_DONE.value, \
                        "Wanted "+hex(Commands.CMD_DONE.value)+", Got "+hex(rec[0])

                        def home(self):
                        self.command(Commands.CMD_HOME)
                        #self.command(Commands.CMD_SETCOORD, 5120, 5120)
                        self.pan = 5120
                        self.tilt = 5120


class Commands(Enum):
    CMD_OK = 0xAA
    CMD_END = 0xAF
    CMD_HOME = 0xBF
    CMD_JOG_PAN_CCW = 0x1A
    CMD_JOG_TILT_CCW = 0x1B
    CMD_JOG_PAN_CW = 0x2A
    CMD_JOG_TILT_CW = 0x2B
    CMD_GIVE_HWINFO = 0xD3
    CMD_SETCOORD = 0x3A
    CMD_GETCOORD = 0x3B
    CMD_DONE = 0xFA
    CMD_NOPE = 0xFC
    CMD_SETAUTO = 0xFE

video.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python3

import numpy as np
import cv2
import threading
import socket
import time
import zmq
import base64
from ptz_controller import Commands

jDist = 40

# code based on thedatafrog.com/en/articles/human-detection-video/

# thread class used to run video in seperate thread


class VCThread(threading.Thread):
                        def __init__(self):
        threading.Thread.__init__(self)
        context = zmq.Context()
                        self.imgsocket = context.socket(zmq.PUB)
                        self.imgsocket.bind('tcp://*:5002')

                        self.camera = cv2.VideoCapture(0)
                        self.go = True

                        self.detector = Detection()
                        self.detector.start()

                        class VCException(Exception):
                        pass

                        def run(self):
                        while(self.go):

            ret, frame = self.camera.read()
                        self.detector.setFrame(frame)
            boxes = self.detector.getCurrentBoxes()
                        if (boxes.size > 0):
                cv2.rectangle(
                    frame, (boxes[0][0], boxes[0, 1]), (boxes[0][2], boxes[0][3]), (255, 0, 0), 10)
            frame = cv2.resize(frame, (640, 480))
                        #gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
            encoded, buf = cv2.imencode('.jpg', frame)
                        # cv2.imshow("frame", frame)
                        # cv2.waitKey(1)
            image = base64.b64encode(buf)
                        self.imgsocket.send(image)
                        if (thePTZ.Auto == True):
                        self.detector.AutoUpdate()
                        self.camera.release()
        cv2.destroyAllWindows()

                        def stopThread(self):
                        self.go = False


class Detection(threading.Thread):
                        def __init__(self):
                        # init HOG descriptor/person detection. HOG is Histogram of
                        # Oriented Gradients. It is used for image detection. In our case
                        # It will be used for people detection.
        threading.Thread.__init__(self)
                        self.hog = cv2.HOGDescriptor()
                        self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
                        self.frame = False
                        self.newFrame = False
                        self.boxes = np.array([])

                        def run(self):
                        while(True):
                        if(not self.newFrame):
                time.sleep(0.1)
                        else:
                        self.boxes = self.detect(self.frame)
                        self.newFrame = False

                        def setFrame(self, frame):
                        if (not self.newFrame):
                        self.frame = frame
                        self.newFrame = True

                        def getCurrentBoxes(self):
                        return np.copy(self.boxes)

                        def detect(self, frame):
                        # use greyscale for faster detection
        gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
                        # detect people in the image
                        # returns the bounding boxes for the detected objects
        boxes, weights = self.hog.detectMultiScale(gray, winStride=(8, 8))
                        # print(boxes)
        boxes = np.array([[x, y, x+w, y+y] for (x, y, w, h) in boxes])
                        return boxes

                        def AutoUpdate(self, thePTZ):
                        # average values
        avg_x = []
        avg_y = []
        new_x = 0
        new_y = 0
                        if (self.boxes.size == 0):
                        return
                        # loop through boxes (person would have multiple boxes. Could be problematic with multiple people on screen)
                        for i in self.boxes:
                        # average x coordinates
            avg_x.append((i[0] + i[2]) / 2)
            avg_y.append((i[1] + i[3]) / 2)
                        for x in avg_x:
            new_x = new_x + x
                        if (len(avg_x) > 0):
            new_x = new_x / len(avg_x)
                        for y in avg_y:
            new_y = new_y + y
                        if (len(avg_y) > 0):
            new_x = new_y / len(avg_y)
        framex = self.frame.shape[1]
        framey = self.frame.shape[0]
                        #print(f"Frame is {framex:5d} x {framey:5d}")
                        # compute difference between camera center and bounding box center
        xdif = framex/2 - new_x
        ydif = framey/2 - new_y
        dx = 0
        dy = 0
                        if (abs(xdif) > jDist):
                        # if person to the right
                        if (xdif < 0):
                        # move right
                dx = jDist

                        else:
                        # move left
                dx = -jDist

                        # if ydif worth moving
                        if (abs(ydif) > jDist):
                        # if person is up
                        if (ydif < 0):
                        # look up
                dy = -jDist

                        else:
                        # Look down
                dy = jDist

                        #print("Move to "+str(x)+", "+str(y))
        thePTZ.command(Commands.CMD_SETCOORD,
                       thePTZ.pan + dx, thePTZ.tilt + dy)